<?php

/**
 * @group JobQueue
 * @group medium
 * @group Database
 */
class JobQueueTest extends MediaWikiTestCase {
	protected $key;
	protected $queueRand, $queueRandTTL, $queueFifo, $queueFifoTTL;

	function __construct( $name = null, array $data = [], $dataName = '' ) {
		parent::__construct( $name, $data, $dataName );

		$this->tablesUsed[] = 'job';
	}

	protected function setUp() {
		global $wgJobTypeConf;
		parent::setUp();

		if ( $this->getCliArg( 'use-jobqueue' ) ) {
			$name = $this->getCliArg( 'use-jobqueue' );
			if ( !isset( $wgJobTypeConf[$name] ) ) {
				throw new MWException( "No \$wgJobTypeConf entry for '$name'." );
			}
			$baseConfig = $wgJobTypeConf[$name];
		} else {
			$baseConfig = [ 'class' => 'JobQueueDB' ];
		}
		$baseConfig['type'] = 'null';
		$baseConfig['wiki'] = wfWikiID();
		$variants = [
			'queueRand' => [ 'order' => 'random', 'claimTTL' => 0 ],
			'queueRandTTL' => [ 'order' => 'random', 'claimTTL' => 10 ],
			'queueTimestamp' => [ 'order' => 'timestamp', 'claimTTL' => 0 ],
			'queueTimestampTTL' => [ 'order' => 'timestamp', 'claimTTL' => 10 ],
			'queueFifo' => [ 'order' => 'fifo', 'claimTTL' => 0 ],
			'queueFifoTTL' => [ 'order' => 'fifo', 'claimTTL' => 10 ],
		];
		foreach ( $variants as $q => $settings ) {
			try {
				$this->$q = JobQueue::factory( $settings + $baseConfig );
			} catch ( MWException $e ) {
				// unsupported?
				// @todo What if it was another error?
			};
		}
	}

	protected function tearDown() {
		parent::tearDown();
		foreach (
			[
				'queueRand', 'queueRandTTL', 'queueTimestamp', 'queueTimestampTTL',
				'queueFifo', 'queueFifoTTL'
			] as $q
		) {
			if ( $this->$q ) {
				$this->$q->delete();
			}
			$this->$q = null;
		}
	}

	/**
	 * @dataProvider provider_queueLists
	 * @covers JobQueue::getWiki
	 */
	public function testGetWiki( $queue, $recycles, $desc ) {
		$queue = $this->$queue;
		if ( !$queue ) {
			$this->markTestSkipped( $desc );
		}
		$this->assertEquals( wfWikiID(), $queue->getWiki(), "Proper wiki ID ($desc)" );
	}

	/**
	 * @dataProvider provider_queueLists
	 * @covers JobQueue::getType
	 */
	public function testGetType( $queue, $recycles, $desc ) {
		$queue = $this->$queue;
		if ( !$queue ) {
			$this->markTestSkipped( $desc );
		}
		$this->assertEquals( 'null', $queue->getType(), "Proper job type ($desc)" );
	}

	/**
	 * @dataProvider provider_queueLists
	 * @covers JobQueue
	 */
	public function testBasicOperations( $queue, $recycles, $desc ) {
		$queue = $this->$queue;
		if ( !$queue ) {
			$this->markTestSkipped( $desc );
		}

		$this->assertTrue( $queue->isEmpty(), "Queue is empty ($desc)" );

		$queue->flushCaches();
		$this->assertEquals( 0, $queue->getSize(), "Queue is empty ($desc)" );
		$this->assertEquals( 0, $queue->getAcquiredCount(), "Queue is empty ($desc)" );

		$this->assertNull( $queue->push( $this->newJob() ), "Push worked ($desc)" );
		$this->assertNull( $queue->batchPush( [ $this->newJob() ] ), "Push worked ($desc)" );

		$this->assertFalse( $queue->isEmpty(), "Queue is not empty ($desc)" );

		$queue->flushCaches();
		$this->assertEquals( 2, $queue->getSize(), "Queue size is correct ($desc)" );
		$this->assertEquals( 0, $queue->getAcquiredCount(), "No jobs active ($desc)" );
		$jobs = iterator_to_array( $queue->getAllQueuedJobs() );
		$this->assertEquals( 2, count( $jobs ), "Queue iterator size is correct ($desc)" );

		$job1 = $queue->pop();
		$this->assertFalse( $queue->isEmpty(), "Queue is not empty ($desc)" );

		$queue->flushCaches();
		$this->assertEquals( 1, $queue->getSize(), "Queue size is correct ($desc)" );

		$queue->flushCaches();
		if ( $recycles ) {
			$this->assertEquals( 1, $queue->getAcquiredCount(), "Active job count ($desc)" );
		}

		$job2 = $queue->pop();
		$this->assertTrue( $queue->isEmpty(), "Queue is empty ($desc)" );
		$this->assertEquals( 0, $queue->getSize(), "Queue is empty ($desc)" );

		$queue->flushCaches();
		if ( $recycles ) {
			$this->assertEquals( 2, $queue->getAcquiredCount(), "Active job count ($desc)" );
		}

		$queue->ack( $job1 );

		$queue->flushCaches();
		if ( $recycles ) {
			$this->assertEquals( 1, $queue->getAcquiredCount(), "Active job count ($desc)" );
		}

		$queue->ack( $job2 );

		$queue->flushCaches();
		$this->assertEquals( 0, $queue->getAcquiredCount(), "Active job count ($desc)" );

		$this->assertNull( $queue->batchPush( [ $this->newJob(), $this->newJob() ] ),
			"Push worked ($desc)" );
		$this->assertFalse( $queue->isEmpty(), "Queue is not empty ($desc)" );

		$queue->delete();
		$queue->flushCaches();
		$this->assertTrue( $queue->isEmpty(), "Queue is empty ($desc)" );
		$this->assertEquals( 0, $queue->getSize(), "Queue is empty ($desc)" );
	}

	/**
	 * @dataProvider provider_queueLists
	 * @covers JobQueue
	 */
	public function testBasicDeduplication( $queue, $recycles, $desc ) {
		$queue = $this->$queue;
		if ( !$queue ) {
			$this->markTestSkipped( $desc );
		}

		$this->assertTrue( $queue->isEmpty(), "Queue is empty ($desc)" );

		$queue->flushCaches();
		$this->assertEquals( 0, $queue->getSize(), "Queue is empty ($desc)" );
		$this->assertEquals( 0, $queue->getAcquiredCount(), "Queue is empty ($desc)" );

		$this->assertNull(
			$queue->batchPush(
				[ $this->newDedupedJob(), $this->newDedupedJob(), $this->newDedupedJob() ]
			),
			"Push worked ($desc)" );

		$this->assertFalse( $queue->isEmpty(), "Queue is not empty ($desc)" );

		$queue->flushCaches();
		$this->assertEquals( 1, $queue->getSize(), "Queue size is correct ($desc)" );
		$this->assertEquals( 0, $queue->getAcquiredCount(), "No jobs active ($desc)" );

		$this->assertNull(
			$queue->batchPush(
				[ $this->newDedupedJob(), $this->newDedupedJob(), $this->newDedupedJob() ]
			),
			"Push worked ($desc)"
		);

		$this->assertFalse( $queue->isEmpty(), "Queue is not empty ($desc)" );

		$queue->flushCaches();
		$this->assertEquals( 1, $queue->getSize(), "Queue size is correct ($desc)" );
		$this->assertEquals( 0, $queue->getAcquiredCount(), "No jobs active ($desc)" );

		$job1 = $queue->pop();
		$this->assertTrue( $queue->isEmpty(), "Queue is empty ($desc)" );

		$queue->flushCaches();
		$this->assertEquals( 0, $queue->getSize(), "Queue is empty ($desc)" );
		if ( $recycles ) {
			$this->assertEquals( 1, $queue->getAcquiredCount(), "Active job count ($desc)" );
		}

		$queue->ack( $job1 );

		$queue->flushCaches();
		$this->assertEquals( 0, $queue->getAcquiredCount(), "Active job count ($desc)" );
	}

	/**
	 * @dataProvider provider_queueLists
	 * @covers JobQueue
	 */
	public function testDeduplicationWhileClaimed( $queue, $recycles, $desc ) {
		$queue = $this->$queue;
		if ( !$queue ) {
			$this->markTestSkipped( $desc );
		}

		$job = $this->newDedupedJob();
		$queue->push( $job );

		// De-duplication does not apply to already-claimed jobs
		$j = $queue->pop();
		$queue->push( $job );
		$queue->ack( $j );

		$j = $queue->pop();
		// Make sure ack() of the twin did not delete the sibling data
		$this->assertType( 'NullJob', $j );
	}

	/**
	 * @dataProvider provider_queueLists
	 * @covers JobQueue
	 */
	public function testRootDeduplication( $queue, $recycles, $desc ) {
		$queue = $this->$queue;
		if ( !$queue ) {
			$this->markTestSkipped( $desc );
		}

		$this->assertTrue( $queue->isEmpty(), "Queue is empty ($desc)" );

		$queue->flushCaches();
		$this->assertEquals( 0, $queue->getSize(), "Queue is empty ($desc)" );
		$this->assertEquals( 0, $queue->getAcquiredCount(), "Queue is empty ($desc)" );

		$id = wfRandomString( 32 );
		$root1 = Job::newRootJobParams( "nulljobspam:$id" ); // task ID/timestamp
		for ( $i = 0; $i < 5; ++$i ) {
			$this->assertNull( $queue->push( $this->newJob( 0, $root1 ) ), "Push worked ($desc)" );
		}
		$queue->deduplicateRootJob( $this->newJob( 0, $root1 ) );

		$root2 = $root1;
		# Add a second to UNIX epoch and format back to TS_MW
		$root2_ts = strtotime( $root2['rootJobTimestamp'] );
		$root2_ts++;
		$root2['rootJobTimestamp'] = wfTimestamp( TS_MW, $root2_ts );

		$this->assertNotEquals( $root1['rootJobTimestamp'], $root2['rootJobTimestamp'],
			"Root job signatures have different timestamps." );
		for ( $i = 0; $i < 5; ++$i ) {
			$this->assertNull( $queue->push( $this->newJob( 0, $root2 ) ), "Push worked ($desc)" );
		}
		$queue->deduplicateRootJob( $this->newJob( 0, $root2 ) );

		$this->assertFalse( $queue->isEmpty(), "Queue is not empty ($desc)" );

		$queue->flushCaches();
		$this->assertEquals( 10, $queue->getSize(), "Queue size is correct ($desc)" );
		$this->assertEquals( 0, $queue->getAcquiredCount(), "No jobs active ($desc)" );

		$dupcount = 0;
		$jobs = [];
		do {
			$job = $queue->pop();
			if ( $job ) {
				$jobs[] = $job;
				$queue->ack( $job );
			}
			if ( $job instanceof DuplicateJob ) {
				++$dupcount;
			}
		} while ( $job );

		$this->assertEquals( 10, count( $jobs ), "Correct number of jobs popped ($desc)" );
		$this->assertEquals( 5, $dupcount, "Correct number of duplicate jobs popped ($desc)" );
	}

	/**
	 * @dataProvider provider_fifoQueueLists
	 * @covers JobQueue
	 */
	public function testJobOrder( $queue, $recycles, $desc ) {
		$queue = $this->$queue;
		if ( !$queue ) {
			$this->markTestSkipped( $desc );
		}

		$this->assertTrue( $queue->isEmpty(), "Queue is empty ($desc)" );

		$queue->flushCaches();
		$this->assertEquals( 0, $queue->getSize(), "Queue is empty ($desc)" );
		$this->assertEquals( 0, $queue->getAcquiredCount(), "Queue is empty ($desc)" );

		for ( $i = 0; $i < 10; ++$i ) {
			$this->assertNull( $queue->push( $this->newJob( $i ) ), "Push worked ($desc)" );
		}

		for ( $i = 0; $i < 10; ++$i ) {
			$job = $queue->pop();
			$this->assertTrue( $job instanceof Job, "Jobs popped from queue ($desc)" );
			$params = $job->getParams();
			$this->assertEquals( $i, $params['i'], "Job popped from queue is FIFO ($desc)" );
			$queue->ack( $job );
		}

		$this->assertFalse( $queue->pop(), "Queue is not empty ($desc)" );

		$queue->flushCaches();
		$this->assertEquals( 0, $queue->getSize(), "Queue is empty ($desc)" );
		$this->assertEquals( 0, $queue->getAcquiredCount(), "No jobs active ($desc)" );
	}

	/**
	 * @covers JobQueue
	 */
	public function testQueueAggregateTable() {
		$queue = $this->queueFifo;
		if ( !$queue || !method_exists( $queue, 'getServerQueuesWithJobs' ) ) {
			$this->markTestSkipped();
		}

		$this->assertNotContains(
			[ $queue->getType(), $queue->getWiki() ],
			$queue->getServerQueuesWithJobs(),
			"Null queue not in listing"
		);

		$queue->push( $this->newJob( 0 ) );

		$this->assertContains(
			[ $queue->getType(), $queue->getWiki() ],
			$queue->getServerQueuesWithJobs(),
			"Null queue in listing"
		);
	}

	public static function provider_queueLists() {
		return [
			[ 'queueRand', false, 'Random queue without ack()' ],
			[ 'queueRandTTL', true, 'Random queue with ack()' ],
			[ 'queueTimestamp', false, 'Time ordered queue without ack()' ],
			[ 'queueTimestampTTL', true, 'Time ordered queue with ack()' ],
			[ 'queueFifo', false, 'FIFO ordered queue without ack()' ],
			[ 'queueFifoTTL', true, 'FIFO ordered queue with ack()' ]
		];
	}

	public static function provider_fifoQueueLists() {
		return [
			[ 'queueFifo', false, 'Ordered queue without ack()' ],
			[ 'queueFifoTTL', true, 'Ordered queue with ack()' ]
		];
	}

	function newJob( $i = 0, $rootJob = [] ) {
		return new NullJob( Title::newMainPage(),
			[ 'lives' => 0, 'usleep' => 0, 'removeDuplicates' => 0, 'i' => $i ] + $rootJob );
	}

	function newDedupedJob( $i = 0, $rootJob = [] ) {
		return new NullJob( Title::newMainPage(),
			[ 'lives' => 0, 'usleep' => 0, 'removeDuplicates' => 1, 'i' => $i ] + $rootJob );
	}
}
